{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Spark Train Logistic Regression\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Train Logistic Regression classifier with Apache SparkML"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "export version=`python --version |awk '{print $2}' |awk -F\".\" '{print $1$2}'`\n",
    "\n",
    "echo $version\n",
    "\n",
    "if [ $version == '36' ] || [ $version == '37' ]; then\n",
    "    echo 'Starting installation...'\n",
    "    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log\n",
    "    if [ $? == 0 ]; then\n",
    "        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'\n",
    "    else\n",
    "        echo 'Installation failed, please check log:'\n",
    "        cat install.log\n",
    "    fi\n",
    "elif [ $version == '38' ] || [ $version == '39' ]; then\n",
    "    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log\n",
    "    if [ $? == 0 ]; then\n",
    "        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'\n",
    "    else\n",
    "        echo 'Installation failed, please check log:'\n",
    "        cat install.log\n",
    "    fi\n",
    "else\n",
    "    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'\n",
    "    exit -1\n",
    "fi"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkContext, SparkConf, SQLContext\n",
    "import os\n",
    "from pyspark.ml.classification import LogisticRegression\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    "from pyspark2pmml import PMMLBuilder\n",
    "from pyspark.ml.feature import StringIndexer\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "from pyspark.ml.feature import MinMaxScaler\n",
    "import logging\n",
    "import shutil\n",
    "import site\n",
    "import sys\n",
    "import wget\n",
    "import re"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if sys.version[0:3] == '3.9':\n",
    "    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'\n",
    "           'jpmml-sparkml-executable-1.7.2.jar')\n",
    "    wget.download(url)\n",
    "    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',\n",
    "                site.getsitepackages()[0] + '/pyspark/jars/')\n",
    "elif sys.version[0:3] == '3.8':\n",
    "    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'\n",
    "           'jpmml-sparkml-executable-1.7.2.jar')\n",
    "    wget.download(url)\n",
    "    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',\n",
    "                site.getsitepackages()[0] + '/pyspark/jars/')\n",
    "elif sys.version[0:3] == '3.7':\n",
    "    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'\n",
    "           'jpmml-sparkml-executable-1.5.12.jar')\n",
    "    wget.download(url)\n",
    "elif sys.version[0:3] == '3.6':\n",
    "    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'\n",
    "           'jpmml-sparkml-executable-1.5.12.jar')\n",
    "    wget.download(url)\n",
    "else:\n",
    "    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '\n",
    "                    'you need a different version please open an issue at '\n",
    "                    'https://github.com/IBM/claimed/issues')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_parquet = os.environ.get('data_parquet',\n",
    "                              'data.parquet')  # input file name (parquet)\n",
    "master = os.environ.get('master',\n",
    "                        \"local[*]\")  # URL to Spark master\n",
    "model_target = os.environ.get('model_target',\n",
    "                              \"model.xml\")  # model output file name\n",
    "data_dir = os.environ.get('data_dir',\n",
    "                          '../../data/')  # temporary directory for data\n",
    "input_columns = os.environ.get('input_columns',\n",
    "                               '[\"x\", \"y\", \"z\"]')  # input columns to consider"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "parameters = list(\n",
    "    map(lambda s: re.sub('$', '\"', s),\n",
    "        map(\n",
    "            lambda s: s.replace('=', '=\"'),\n",
    "            filter(\n",
    "                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\\/A-Za-z0-9]*', s)),\n",
    "                sys.argv\n",
    "            )\n",
    "    )))\n",
    "\n",
    "for parameter in parameters:\n",
    "    logging.warning('Parameter: ' + parameter)\n",
    "    exec(parameter)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "conf = SparkConf().setMaster(master)\n",
    "#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':\n",
    "conf.set(\"spark.jars\", 'jpmml-sparkml-executable-1.5.12.jar')\n",
    "\n",
    "sc = SparkContext.getOrCreate(conf)\n",
    "sqlContext = SQLContext(sc)\n",
    "spark = sqlContext.sparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.parquet(data_dir + data_parquet)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# register a corresponding query table\n",
    "df.createOrReplaceTempView('df')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import DoubleType\n",
    "df = df.withColumn(\"x\", df.x.cast(DoubleType()))\n",
    "df = df.withColumn(\"y\", df.y.cast(DoubleType()))\n",
    "df = df.withColumn(\"z\", df.z.cast(DoubleType()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "splits = df.randomSplit([0.8, 0.2])\n",
    "df_train = splits[0]\n",
    "df_test = splits[1]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "indexer = StringIndexer(inputCol=\"class\", outputCol=\"label\")\n",
    "\n",
    "vectorAssembler = VectorAssembler(inputCols=eval(input_columns),\n",
    "                                  outputCol=\"features\")\n",
    "\n",
    "normalizer = MinMaxScaler(inputCol=\"features\", outputCol=\"features_norm\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = pipeline.fit(df_train)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "prediction = model.transform(df_train)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "binEval = MulticlassClassificationEvaluator(). \\\n",
    "    setMetricName(\"accuracy\"). \\\n",
    "    setPredictionCol(\"prediction\"). \\\n",
    "    setLabelCol(\"label\")\n",
    "\n",
    "binEval.evaluate(prediction)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pmmlBuilder = PMMLBuilder(sc, df_train, model)\n",
    "pmmlBuilder.buildFile(data_dir + model_target)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.6"
  },
  "papermill": {
   "default_parameters": {},
   "duration": 55.042719,
   "end_time": "2021-01-28T16:00:26.871724",
   "environment_variables": {},
   "exception": null,
   "input_path": "/home/jovyan/work/elyra-classification/train-trusted-ai.ipynb",
   "output_path": "/home/jovyan/work/elyra-classification/train-trusted-ai.ipynb",
   "parameters": {},
   "start_time": "2021-01-28T15:59:31.829005",
   "version": "2.2.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
